package com.fintech.pangu.rocketmq.metrics;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import lombok.extern.slf4j.Slf4j;

/**
 * 实现RocketMQMetricsCollector针对具体事件如何度量的逻辑
 * 事件->度量的转化，暴露标记度量接口给具体Metrics实现
 */
@Slf4j
public abstract class AbstractRocketMQMetricsCollector implements RocketMQMetricsCollector {

    //============= 发送事件 =============//
    /**
     * 客户端发送成功
     */
    @Override
    public void sendSuccess(RocketMQSendMessageContext context) {
        try {
            markSendMessageSuccess(context);
        }
        catch(Exception e) {
            log.error("Error while computing metrics in sendSuccess: " + e.getMessage());
        }
    }

    /**
     * 客户端发送失败
     */
    @Override
    public void sendFailure(RocketMQSendMessageContext context) {
        try {
            // 提取responseCode
            extractResponseCode(context);

            markSendMessageFailed(context);
        }
        catch(Exception e) {
            log.error("Error while computing metrics in sendFailure: " + e.getMessage());
        }
    }

    /**
     * 异常情况统一处理，提取responseCode
     * @param context
     */
    private void extractResponseCode(RocketMQSendMessageContext context) {
        int responseCode = -1;
        Exception exception = context.getException();
        if(exception instanceof MQBrokerException) {
            responseCode = ((MQBrokerException)exception).getResponseCode();
        }
        else if(exception instanceof MQClientException){
            responseCode = ((MQClientException)exception).getResponseCode();
        }
        if(responseCode != -1){
            context.setResponseCode(responseCode);
        }
    }

    /**
     * 内部发送成功
     * @param context
     */
    @Override
    public void sendKernelSuccess(RocketMQSendMessageContext context) {
        try {
            markSendKernelMessageSuccess(context);
        }
        catch(Exception e) {
            log.error("Error while computing metrics in sendKernelSuccess: " + e.getMessage());
        }
    }

    /**
     * 内部发送失败
     * @param context
     */
    @Override
    public void sendKernelFailure(RocketMQSendMessageContext context) {
        try {
            // 提取responseCode
            extractResponseCode(context);

            markSendKernelMessageFailure(context);
        }
        catch(Exception e) {
            log.error("Error while computing metrics in sendKernelFailure: " + e.getMessage());
        }
    }


    //============= 消费事件 =============//
    @Override
    public void consumeSuccess(RocketMQConsumeMessageContext context) {
        try {
            markConsumeMessageSuccess(context);
        }
        catch(Exception e) {
            log.error("Error while computing metrics in consumeSuccess: " + e.getMessage());
        }
    }

    @Override
    public void consumeFailure(RocketMQConsumeMessageContext context) {
        try {
            markConsumeMessageFailure(context);
        }
        catch(Exception e) {
            log.error("Error while computing metrics in consumeFailure: " + e.getMessage());
        }
    }

    @Override
    public void consumeKernelSuccess(RocketMQConsumeMessageContext context) {
        try {
            markConsumeKernelMessageSuccess(context);
        }
        catch(Exception e) {
            log.error("Error while computing metrics in consumeKernelSuccess: " + e.getMessage());
        }
    }

    @Override
    public void consumeKernelFailure(RocketMQConsumeMessageContext context) {
        try {
            markConsumeKernelMessageFailure(context);
        }
        catch(Exception e) {
            log.error("Error while computing metrics in consumeKernelFailure: " + e.getMessage());
        }
    }



    //============= 发送标记 =============//
    /**
     * 标记客户端发送成功
     */
    protected abstract void markSendMessageSuccess(RocketMQSendMessageContext context);

    /**
     * 标记客户端发送失败
     */
    protected abstract void markSendMessageFailed(RocketMQSendMessageContext context);

    /**
     * 标记内部发送成功
     */
    protected abstract void markSendKernelMessageSuccess(RocketMQSendMessageContext context);

    /**
     * 标记内部发送失败
     */
    protected abstract void markSendKernelMessageFailure(RocketMQSendMessageContext context);


    //============= 消费标记 =============//
    /**
     * 标记消费端消费成功
     */
    protected abstract void markConsumeMessageSuccess(RocketMQConsumeMessageContext context);

    /**
     * 标记消费端消费失败
     */
    protected abstract void markConsumeMessageFailure(RocketMQConsumeMessageContext context);

    /**
     * 标记消费端内部消费成功
     */
    protected abstract void markConsumeKernelMessageSuccess(RocketMQConsumeMessageContext context);

    /**
     * 标记消费端内部消费失败
     */
    protected abstract void markConsumeKernelMessageFailure(RocketMQConsumeMessageContext context);


}
